[3/N] DynamicListenerAgentFamily — cluster-coordinated dynamic listeners (GH-2685)#2702
Merged
jeremydmiller merged 1 commit intomainfrom May 7, 2026
Merged
Conversation
…stered listener URIs [3/N] Third slice of GH-2685. The previous PRs (#2699 foundation, #2700 RDBMS storage) gave us a durable place to persist listener URIs and the opt-in flag to gate it. This PR connects that storage to the cluster's agent runtime so registered URIs become live listeners on exactly one node — the user-visible API for "add an MQTT broker at runtime and have the cluster activate it" the original issue called for. What's in this PR: - New `Wolverine.Runtime.Agents.DynamicListenerAgentFamily` — `IAgentFamily` implementation that re-reads `IMessageStore.Listeners.AllListenersAsync()` on every assignment cycle (no boot-time snapshot, so newly registered URIs are picked up within one polling interval, default 30s) and balances the resulting agents across the cluster via `assignments.DistributeEvenly(Scheme)`. Same pattern as the existing `ExclusiveListenerFamily` and `StickyPostgresqlQueueListenerAgentFamily`. - New `Wolverine.Runtime.Agents.DynamicListenerAgent` — single-URI `IAgent` whose StartAsync resolves the listener URI's scheme to a registered transport via `Options.Transports.ForScheme(...)`, asks the transport to materialize an `Endpoint` from the URI, and hands it to `IEndpointCollection.StartListenerAsync`. Failure to resolve the transport surfaces as a hard `InvalidOperationException` with a hint pointing at `UseMqtt` / `UseRabbitMq` / etc., so a misconfigured host doesn't silently never run the listener. - New `Wolverine.Runtime.Agents.DynamicListenerUriEncoding` — lossless round-trip between a listener URI and an agent URI under the reserved `wolverine-dynamic-listener:///` scheme (percent-encoded payload as the path, empty authority so embedded `://` in the listener URI doesn't collide with the agent URI's own parsing). The cluster keys agents by their URI, so the encoding has to be deterministic — covered by parameterized round-trip tests. - `NodeAgentController` registers the family when both `Durability.Mode == Balanced` AND `Durability.EnableDynamicListeners == true`. Off by default; Solo / MediatorOnly / Serverless modes don't get cluster assignment so the family wouldn't help them anyway. - New `IWolverineRuntime.RegisterListenerAsync` / `RemoveListenerAsync` / `AllRegisteredListenersAsync` extension methods in `Wolverine.Runtime.WolverineRuntimeListenerExtensions` so user code calls `runtime.RegisterListenerAsync(uri)` rather than `runtime.Storage.Listeners.RegisterListenerAsync(uri)`. Thin pass-through; no transformation — the public surface is just to keep dependency reach narrower in user code. Tests: - `DynamicListenerUriEncodingTests` — 6 round-trip cases covering MQTT topic wildcards, userinfo + port, query strings; plus determinism + scheme-mismatch + empty-path guards. 9/9. - `DynamicListenerAgentFamilyTests` — verifies empty-store → empty agents, listener URIs project through the encoder, the store is re-read on every call (no caching), and BuildAgentAsync decodes back to a `DynamicListenerAgent`. 6/6. - `WolverineRuntimeListenerExtensionsTests` — pins the public pass-through contract and argument-null guards. 7/7. - `dynamic_listener_agent_lifecycle_integration` — spins up a real WolverineHost in solo mode, drives an agent through Start/Stop, asserts the listener actually goes live via `runtime.Endpoints.FindListeningAgent(uri)`. Also covers the no-transport error path and the "stop before start" tolerance needed for cluster reassignment edge cases. 3/3. Regression checks: - CoreTests `Runtime.Agents`: 88/88 (existing agent family suite plus the new 25 dynamic-listener tests). - CoreTests `Persistence.Durability`: 17/17 (foundation defaults unchanged). - PostgresqlTests `PostgresqlMessageStoreTests`: 48/48 (RDBMS-backed listener store from PR #2700 still green). Deferred to follow-up: - Marten-backed `IListenerStore`. - MQTT-specific multiplexed listener (one shared MqttListener + BufferedReceiver, N broker subscriptions) for the 10K-IoT case. - IWolverineRuntime topic-name → Uri convenience extensions. - Documentation in MQTT and Durability guides. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Third slice of #2685. Connects the durable listener registry from PR #2700 to the cluster's agent runtime so registered URIs become live listeners on exactly one node — the user-visible surface for "add an MQTT broker at runtime and have the cluster activate it" the original issue called for.
What's in this PR
DynamicListenerAgentFamily— re-readsIMessageStore.Listeners.AllListenersAsync()on every assignment cycle (no boot snapshot, so newly registered URIs are picked up within one polling interval, default 30s) and balances the agents across the cluster viaassignments.DistributeEvenly(Scheme). Same shape asExclusiveListenerFamily/StickyPostgresqlQueueListenerAgentFamily.DynamicListenerAgent— single-URIIAgent.StartAsyncresolves the listener URI's scheme to a registered transport viaOptions.Transports.ForScheme(...), asks the transport to materialize anEndpointfrom the URI, and hands it toIEndpointCollection.StartListenerAsync. Failure to resolve surfaces asInvalidOperationExceptionwith a hint atUseMqtt/UseRabbitMq/ etc.DynamicListenerUriEncoding— lossless round-trip between listener URI and agent URI under the reservedwolverine-dynamic-listener:///scheme. Empty authority + percent-encoded path keeps the embedded://from colliding with the agent URI's own parsing. Determinism is required — the cluster keys agents by URI, so the encoding has to produce the same agent URI for the same listener URI on every poll.NodeAgentControllerregisters the family when bothDurability.Mode == BalancedANDDurability.EnableDynamicListeners == true. Off by default. Solo / MediatorOnly / Serverless modes don't get cluster assignment so the family wouldn't help them anyway.IWolverineRuntime.RegisterListenerAsync/RemoveListenerAsync/AllRegisteredListenersAsyncextension methods so user code callsruntime.RegisterListenerAsync(uri)rather than reaching throughruntime.Storage.Listeners. Thin pass-through.Test plan
DynamicListenerUriEncodingTests— 9/9. Parameterized round-trip cases (MQTT topic wildcards, userinfo+port, query strings) + determinism + scheme-mismatch + empty-path guards.DynamicListenerAgentFamilyTests— 6/6. Empty-store → empty agents; listener URIs project through encoder; store re-read on every call (no caching);BuildAgentAsyncdecodes back to aDynamicListenerAgent.WolverineRuntimeListenerExtensionsTests— 7/7. Pass-through contract + arg-null guards.dynamic_listener_agent_lifecycle_integration— 3/3. RealWolverineHostin solo mode; drives an agent through Start/Stop; asserts listener actually goes live viaruntime.Endpoints.FindListeningAgent(uri). Covers no-transport error path + "stop before start" tolerance.Runtime.Agents: 88/88.Persistence.Durability: 17/17 (foundation defaults unchanged).PostgresqlMessageStoreTests: 48/48 (PR [2/N] real RDBMS-backed IListenerStore (GH-2685) #2700 storage still green).Deferred to follow-up PR (final piece of #2685)
IListenerStore.MqttListener+BufferedReceiver, N broker subscriptions) — Lau's 10K-IoT case.IWolverineRuntimetopic-name → Uri convenience extensions for MQTT.🤖 Generated with Claude Code